package com.atguigu.chapter11;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Author: Pepsi
 * Date: 2023/8/24
 * Desc:
 */
public class Flink08_SQL_File {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table sensor(" +
                " id string, " +
                " ts bigint, " +
                " vc int " +
                ")with(" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'input/sensor.txt', " +
                " 'format' = 'csv' " +
                ")");

        Table result = tEnv.sqlQuery("select * from sensor where id = 'sensor_1'");

        tEnv.executeSql("create table abc(" +
                "id string," +
                "ts bigint," +
                "vc int)" +
                "with(" +
                "'connector'='filesystem'," +
                "'path'='input/c.txt'," +
                "'format'='csv')");

        result.executeInsert("abc");  // 按照顺序写入的，读的表和写的表字段顺序要一致
        // 不校验字段名，

//        tEnv.executeSql("insert into abc select * from "+result);

    }
}
